package pcsv

import (
	"encoding/csv"
	"io"
	"log"
	"os"
	"runtime"
	"sync"

	ants "github.com/panjf2000/ants/v2"
)

type PCsvReader struct {
	WithHeader      bool
	FieldsPerRecord int
	FilePaths       []string
	ReadingState    map[string]int // map[filepath]line
}

func NewPCsvReader(filePaths []string, withHeader bool) *PCsvReader {
	return &PCsvReader{
		WithHeader:      withHeader,
		FilePaths:       filePaths,
		FieldsPerRecord: 0,
		ReadingState:    make(map[string]int),
	}
}

func (pr *PCsvReader) Read(dataCh chan []string) {
	defer close(dataCh)
	var wg sync.WaitGroup

	f := func(csvPath interface{}) {
		readOneCsv(csvPath.(string), pr.WithHeader, pr.FieldsPerRecord, pr.ReadingState, dataCh)
		wg.Done()
	}

	p, err := ants.NewPoolWithFunc(runtime.NumCPU(), f)
	if err != nil {
		log.Fatalf("Start Gouroutine Pool error: %s\n", err)
	}
	defer p.Release()

	for _, csvPath := range pr.FilePaths {
		wg.Add(1)
		if err = p.Invoke(csvPath); err != nil {
			log.Fatalf("Invoke readOneCsv error: %s\n", err)
		}
	}

	wg.Wait()
}

func readOneCsv(csvPath string, withHeader bool, fieldPerRecord int, rs map[string]int, dataCh chan []string) {
	fi, err := os.Open(csvPath)
	if err != nil {
		log.Fatal(err)
	}
	defer fi.Close()

	csvReader := csv.NewReader(fi)
	csvReader.FieldsPerRecord = fieldPerRecord
	rs[csvPath] = 0
	// header is not needed
	if withHeader {
		if _, err := csvReader.Read(); err != nil {
			log.Fatal(err)
		}
		rs[csvPath]++
	}

	for {
		record, err := csvReader.Read()
		if err != nil {
			if err == io.EOF {
				break
			}
			log.Fatalf("[%s] At LINE: %d, readOneCsv error: %s\n", csvPath, rs[csvPath], err)
		}

		rs[csvPath]++
		dataCh <- record
	}
}
